{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>\n",
    "\n",
    "<i>Licensed under the MIT License.</i>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Running ALS on MovieLens (PySpark)\n",
    "\n",
    "Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.\n",
    "\n",
    "This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a [Data Science Virtual Machine](https://azure.microsoft.com/en-gb/services/virtual-machines/data-science-virtual-machines/)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup) to install the PySpark environment."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "System version: 3.6.8 |Anaconda, Inc.| (default, Dec 30 2018, 01:22:34) \n",
      "[GCC 7.3.0]\n",
      "Spark version: 2.3.1\n"
     ]
    }
   ],
   "source": [
    "# set the environment path to find Recommenders\n",
    "import sys\n",
    "sys.path.append(\"../../\")\n",
    "import pyspark\n",
    "from pyspark.ml.recommendation import ALS\n",
    "import pyspark.sql.functions as F\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.types import StructType, StructField\n",
    "from pyspark.sql.types import StringType, FloatType, IntegerType, LongType\n",
    "\n",
    "from reco_utils.common.timer import Timer\n",
    "from reco_utils.dataset import movielens\n",
    "from reco_utils.common.notebook_utils import is_jupyter\n",
    "from reco_utils.dataset.spark_splitters import spark_random_split\n",
    "from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation\n",
    "from reco_utils.common.spark_utils import start_or_get_spark\n",
    "\n",
    "print(\"System version: {}\".format(sys.version))\n",
    "print(\"Spark version: {}\".format(pyspark.__version__))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Set the default parameters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# top k items to recommend\n",
    "TOP_K = 10\n",
    "\n",
    "# Select MovieLens data size: 100k, 1m, 10m, or 20m\n",
    "MOVIELENS_DATA_SIZE = '100k'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 0. Set up Spark context\n",
    "\n",
    "The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# the following settings work well for debugging locally on VM - change when running on a cluster\n",
    "# set up a giant single executor with many threads and specify memory cap\n",
    "spark = start_or_get_spark(\"ALS PySpark\", memory=\"16g\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1. Download the MovieLens dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 4.81k/4.81k [00:00<00:00, 19.9kKB/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+------+---------+\n",
      "|UserId|MovieId|Rating|Timestamp|\n",
      "+------+-------+------+---------+\n",
      "|   196|    242|   3.0|881250949|\n",
      "|   186|    302|   3.0|891717742|\n",
      "|    22|    377|   1.0|878887116|\n",
      "|   244|     51|   2.0|880606923|\n",
      "|   166|    346|   1.0|886397596|\n",
      "|   298|    474|   4.0|884182806|\n",
      "|   115|    265|   2.0|881171488|\n",
      "|   253|    465|   5.0|891628467|\n",
      "|   305|    451|   3.0|886324817|\n",
      "|     6|     86|   3.0|883603013|\n",
      "|    62|    257|   2.0|879372434|\n",
      "|   286|   1014|   5.0|879781125|\n",
      "|   200|    222|   5.0|876042340|\n",
      "|   210|     40|   3.0|891035994|\n",
      "|   224|     29|   3.0|888104457|\n",
      "|   303|    785|   3.0|879485318|\n",
      "|   122|    387|   5.0|879270459|\n",
      "|   194|    274|   2.0|879539794|\n",
      "|   291|   1042|   4.0|874834944|\n",
      "|   234|   1184|   2.0|892079237|\n",
      "+------+-------+------+---------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
    "schema = StructType(\n",
    "    (\n",
    "        StructField(\"UserId\", IntegerType()),\n",
    "        StructField(\"MovieId\", IntegerType()),\n",
    "        StructField(\"Rating\", FloatType()),\n",
    "        StructField(\"Timestamp\", LongType()),\n",
    "    )\n",
    ")\n",
    "\n",
    "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. Split the data using the Spark random splitter provided in utilities"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "N train 75193\n",
      "N test 24807\n"
     ]
    }
   ],
   "source": [
    "train, test = spark_random_split(data, ratio=0.75, seed=123)\n",
    "print (\"N train\", train.cache().count())\n",
    "print (\"N test\", test.cache().count())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data\n",
    "\n",
    "To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (`nonnegative = False`) in order to allow for both positive and negative preferences towards movies.\n",
    "Timing will vary depending on the machine being used to train."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "header = {\n",
    "    \"userCol\": \"UserId\",\n",
    "    \"itemCol\": \"MovieId\",\n",
    "    \"ratingCol\": \"Rating\",\n",
    "}\n",
    "\n",
    "\n",
    "als = ALS(\n",
    "    rank=10,\n",
    "    maxIter=15,\n",
    "    implicitPrefs=False,\n",
    "    regParam=0.05,\n",
    "    coldStartStrategy='drop',\n",
    "    nonnegative=False,\n",
    "    seed=42,\n",
    "    **header\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Took 3.2410509269684553 seconds for training.\n"
     ]
    }
   ],
   "source": [
    "with Timer() as train_time:\n",
    "    model = als.fit(train)\n",
    "\n",
    "print(\"Took {} seconds for training.\".format(train_time.interval))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.\n",
    "\n",
    "In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Took 10.559875106438994 seconds for prediction.\n"
     ]
    }
   ],
   "source": [
    "with Timer() as test_time:\n",
    "\n",
    "    # Get the cross join of all user-item pairs and score them.\n",
    "    users = train.select('UserId').distinct()\n",
    "    items = train.select('MovieId').distinct()\n",
    "    user_item = users.crossJoin(items)\n",
    "    dfs_pred = model.transform(user_item)\n",
    "\n",
    "    # Remove seen items.\n",
    "    dfs_pred_exclude_train = dfs_pred.alias(\"pred\").join(\n",
    "        train.alias(\"train\"),\n",
    "        (dfs_pred['UserId'] == train['UserId']) & (dfs_pred['MovieId'] == train['MovieId']),\n",
    "        how='outer'\n",
    "    )\n",
    "\n",
    "    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[\"train.Rating\"].isNull()) \\\n",
    "        .select('pred.' + 'UserId', 'pred.' + 'MovieId', 'pred.' + \"prediction\")\n",
    "\n",
    "    # In Spark, transformations are lazy evaluation\n",
    "    # Use an action to force execute and measure the test time \n",
    "    top_all.cache().count()\n",
    "\n",
    "print(\"Took {} seconds for prediction.\".format(test_time.interval))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+----------+\n",
      "|UserId|MovieId|prediction|\n",
      "+------+-------+----------+\n",
      "|     1|    587| 3.0676804|\n",
      "|     1|    869| 2.4396753|\n",
      "|     1|   1208| 3.2788403|\n",
      "|     1|   1357| 2.0567489|\n",
      "|     1|   1677| 2.9661644|\n",
      "|     2|     80| 2.3442159|\n",
      "|     2|    472|  3.060428|\n",
      "|     2|    582|  3.489215|\n",
      "|     2|    838| 1.0985656|\n",
      "|     2|    975| 1.8764799|\n",
      "|     2|   1260| 3.0814102|\n",
      "|     2|   1381|  3.288192|\n",
      "|     2|   1530| 1.9368806|\n",
      "|     3|     22| 4.2560363|\n",
      "|     3|     57|  3.295701|\n",
      "|     3|     89|  4.983886|\n",
      "|     3|    367| 2.5427854|\n",
      "|     3|   1091| 1.4424214|\n",
      "|     3|   1167| 2.2066739|\n",
      "|     3|   1499|  3.368075|\n",
      "+------+-------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "top_all.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4. Evaluate how well ALS performs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=\"UserId\", col_item=\"MovieId\", \n",
    "                                    col_rating=\"Rating\", col_prediction=\"prediction\", \n",
    "                                    relevancy_method=\"top_k\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model:\tALS\n",
      "Top K:\t10\n",
      "MAP:\t0.005734\n",
      "NDCG:\t0.047460\n",
      "Precision@K:\t0.051911\n",
      "Recall@K:\t0.017514\n"
     ]
    }
   ],
   "source": [
    "print(\"Model:\\tALS\",\n",
    "      \"Top K:\\t%d\" % rank_eval.k,\n",
    "      \"MAP:\\t%f\" % rank_eval.map_at_k(),\n",
    "      \"NDCG:\\t%f\" % rank_eval.ndcg_at_k(),\n",
    "      \"Precision@K:\\t%f\" % rank_eval.precision_at_k(),\n",
    "      \"Recall@K:\\t%f\" % rank_eval.recall_at_k(), sep='\\n')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 5. Evaluate rating prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+------+---------+----------+\n",
      "|UserId|MovieId|Rating|Timestamp|prediction|\n",
      "+------+-------+------+---------+----------+\n",
      "|   406|    148|   3.0|879540276| 2.2832825|\n",
      "|    27|    148|   3.0|891543129| 1.7940072|\n",
      "|   606|    148|   3.0|878150506| 3.7863157|\n",
      "|   916|    148|   2.0|880843892| 2.3045797|\n",
      "|   236|    148|   4.0|890117028| 1.9480721|\n",
      "|   602|    148|   4.0|888638517| 3.1172547|\n",
      "|   663|    148|   4.0|889492989| 2.7976327|\n",
      "|   372|    148|   5.0|876869915|  4.170663|\n",
      "|   190|    148|   4.0|891033742| 3.6491241|\n",
      "|     1|    148|   2.0|875240799|  2.829558|\n",
      "|   297|    148|   3.0|875239619| 2.1554093|\n",
      "|   178|    148|   4.0|882824325|  3.932391|\n",
      "|   308|    148|   3.0|887740788| 2.9132738|\n",
      "|   923|    148|   4.0|880387474| 3.5403519|\n",
      "|    54|    148|   3.0|880937490|  3.165133|\n",
      "|   430|    148|   2.0|877226047|  2.891675|\n",
      "|    92|    148|   2.0|877383934| 2.6483998|\n",
      "|   447|    148|   4.0|878854729| 3.1101565|\n",
      "|   374|    148|   4.0|880392992| 2.2130618|\n",
      "|   891|    148|   5.0|891639793|  3.138905|\n",
      "+------+-------+------+---------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Generate predicted ratings.\n",
    "prediction = model.transform(test)\n",
    "prediction.cache().show()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model:\tALS rating prediction\n",
      "RMSE:\t0.967296\n",
      "MAE:\t0.753306\n",
      "Explained variance:\t0.261864\n",
      "R squared:\t0.255480\n"
     ]
    }
   ],
   "source": [
    "rating_eval = SparkRatingEvaluation(test, prediction, col_user=\"UserId\", col_item=\"MovieId\", \n",
    "                                    col_rating=\"Rating\", col_prediction=\"prediction\")\n",
    "\n",
    "print(\"Model:\\tALS rating prediction\",\n",
    "      \"RMSE:\\t%f\" % rating_eval.rmse(),\n",
    "      \"MAE:\\t%f\" % rating_eval.mae(),\n",
    "      \"Explained variance:\\t%f\" % rating_eval.exp_var(),\n",
    "      \"R squared:\\t%f\" % rating_eval.rsquared(), sep='\\n')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if is_jupyter():\n",
    "    # Record results with papermill for tests\n",
    "    import papermill as pm\n",
    "    pm.record(\"map\", rank_eval.map_at_k())\n",
    "    pm.record(\"ndcg\", rank_eval.ndcg_at_k())\n",
    "    pm.record(\"precision\", rank_eval.precision_at_k())\n",
    "    pm.record(\"recall\", rank_eval.recall_at_k())\n",
    "    pm.record(\"rmse\", rating_eval.rmse())\n",
    "    pm.record(\"mae\", rating_eval.mae())\n",
    "    pm.record(\"exp_var\", rating_eval.exp_var())\n",
    "    pm.record(\"rsquared\", rating_eval.rsquared())\n",
    "    pm.record(\"train_time\", train_time.interval)\n",
    "    pm.record(\"test_time\", test_time.interval)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "# cleanup spark instance\n",
    "spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python (reco_pyspark)",
   "language": "python",
   "name": "reco_pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
